卡哇勾嘎!!!想必大家都有開發過Publisher-Subscriber架構,有些開發者可能是透過第三放套件進行介接,如:Redis、Kafka或RabbitMQ等套件,或者您系統是在JAVA 8以前開發的,可能是採用RxJAVA或者自己實作Observable Pattern進行管理各項訊息傳播,小編今日要介紹的Spring Reactor套件與RxJava 2採用共同一套接口API標準Reactive Streams Commons,故說明他們的最終目的都是一致的,且這些API都有通用性,如果您曾經有RxJava的開發經驗的話,在今天這個Flux初階介紹,想必是探囊取物一般的簡單,想不想變忍者龜一樣,可以自由地在下水道自由行走,那現在我們就開始來深入分析囉。
Reactor 3是基於JDK中提供的java.util.function來設計實現的,可以很輕鬆地從Java.util.stream.Stream轉為Flux(RxJava中的Flowable類型),亦可輕鬆轉回Stream,也可很快速地實現CompletableFuture與Mono(Mono也支援Publisher接口,可被理解為RxJava 2中對Single的背壓(Back Pressure)加強版)之間的互相轉換,所以可以很輕鬆且安全的基於Optional類型的元素創建Mono,所以在操作上可快方便的應用於Spring Framework 5,適用於各種新版的JDK,也是小編為什麼一開始選用Java 15的原因囉。
根據下圖我們可以看到三角行為開始執行訂閱,圓圈可以視為一個物件、I可視為輸入原子(int,long,float等)、X可視為關閉訂閱頻道(channel),所有推播內容會循序發送給個訂閱者,若推播端發生錯誤或關閉時,會自動將所有訂閱者進行關閉,並停止進行接收任何內容。
圖一、實現Publisher接口原理
小編延續前幾天的架構,新增一個推播產品控制器,此產品可推播給台灣及中國兩個地區國家共同銷售,故小編先建立一個產品推播服務(CacheSubscribeService),由台灣(SeaFoodRetailerServiceImpl)區服務及中國(ChinaSeaFoodRetailerServiceImpl)區服務當訂閱者,透過以下範例可得知,小編建立一個水池道稱為seaFoodSink,並建立一個通量道(Flux)配置推播者入口道(sink),在配置訂閱者排成類型及行為後,即可開始進行產品的推播任務,控制器僅需把產品放入seaFoodSink中,即可讓相關服務開始進行任務,以下程式碼範例提供參考。
// Publisher 服務端配置
@Service
public class CacheSubscribeServiceImpl implements CacheSubscribeService {
Logger logger = LoggerFactory.getLogger(CacheSubscribeServiceImpl.class);
private static FluxSink<SeaFood> seaFoodSink;
@Autowired
@Qualifier("seaFoodRetailService")
SeaFoodRetailerService taiwanSeaFoodRetailerService;
@Autowired
@Qualifier("chinaSeaFoodRetailService")
SeaFoodRetailerService chinaSeaFoodRetailerService;
@PostConstruct
public void init() {
Flux.<SeaFood>create(sink -> this.seaFoodSink = sink)
.doOnNext(seaFood -> {
try {
taiwanSeaFoodRetailerService.createSeaFood(seaFood);
chinaSeaFoodRetailerService.createSeaFood(seaFood);
} catch (SeaFoodRetailerGenericException e) {
logger.error("Create Sea Food into all Place Fail. ex:" + e.toString());
}
})
.onErrorReturn(new SeaFood())
.subscribeOn( Schedulers.elastic())
.subscribe(seaFood -> {
System.out.println("Subscribe model : " + new Gson().toJson(seaFood));
});
}
@Override
public SeaFood subscribeAllLocationSeaFoodProducts(SeaFood seaFood) {
seaFoodSink.next(seaFood);
return seaFood;
}
}
// 控制器配置
@RestController
public class PublishProductController extends ControllerBase {
@Autowired
CacheSubscribeService cacheSubscribeService;
@PostMapping(
value="/${sea.food.api.all}/create",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE
)
ResponseEntity<SeaFood> createSeaFood(@RequestBody SeaFood entity) throws SeaFoodRetailerGenericException {
return new ResponseEntity<>(
cacheSubscribeService.subscribeAllLocationSeaFoodProducts(entity)
, HttpStatus.CREATED
);
}
}
測試結果,進行觸發推播產品
推播產品
台灣區及中國區產品均已加入
故我們可以看到透過Flux可以快速的傳播各項物件,建議最佳的使用環境是在Socket服務上喔,這樣雙邊介面都可快速達到映射物件轉到作用。
由下圖可看出,LambdaSubscriber實現了CoreSubscriber接口,該接口可衍生出各式各樣的訂閱者,對於生產者Publisher接口,也是可衍生多個,Flux留了抽象subscribe方法,提供給各類別具體實現類來實現,CoreSubscriber繼承了org.reactivestreams.Subscriber接口,並加入一些Reactor3特有的Context功能來實現,所以我們可得知此訂閱邏輯概念,如果在LambdaSubscriber中的引數值subscriptionConsumer不為空的,就會觸發onSubscribe方法,反之,則自動觸發onNext方法,詳細各位可參照程式碼邏輯,這邊就不在多加詳述。
圖二、訊息發送流程圖
透過API監測結果,我們可看到此API創建順利,並回傳成功建立的代碼201,無相關異常產生。
Java 編成方法論 - 響應式Spring Reactor 3設計與實現
Reactive Spring實戰 -- 理解Reactor的設計與實現